-
Notifications
You must be signed in to change notification settings - Fork 126
RSDK-10422 Add world object store service #5243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RSDK-10422 Add world object store service #5243
Conversation
…6-move-pointcloud-to-spatialmath
…6-move-pointcloud-to-spatialmath
…6-move-pointcloud-to-spatialmath
…6-move-pointcloud-to-spatialmath
…6-move-pointcloud-to-spatialmath
…6-move-pointcloud-to-spatialmath
…6-move-pointcloud-to-spatialmath
…6-move-pointcloud-to-spatialmath
…2-add-world-object-store-service
services/worldstatestore/client.go
Outdated
| func (c *client) StreamTransformChanges(ctx context.Context, extra map[string]interface{}) (<-chan TransformChange, error) { | ||
| ctx, span := trace.StartSpan(ctx, "worldstatestore::client::StreamTransformChanges") | ||
| defer span.End() | ||
|
|
||
| ext, err := protoutils.StructToStructPb(extra) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| req := &pb.StreamTransformChangesRequest{Name: c.name, Extra: ext} | ||
| stream, err := c.client.StreamTransformChanges(ctx, req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| // Check the initial response immediately to catch early errors. | ||
| _, err = stream.Recv() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| changesChan := make(chan TransformChange, 1024) | ||
|
|
||
| go func() { | ||
| defer close(changesChan) | ||
|
|
||
| for { | ||
| resp, err := stream.Recv() | ||
| if err != nil { | ||
| if errors.Is(err, io.EOF) { | ||
| return | ||
| } | ||
| if ctx.Err() != nil || errors.Is(err, context.Canceled) { | ||
| c.logger.Debug(err) | ||
| return | ||
| } | ||
| c.logger.Errorw("failed to receive from stream", "error", err) | ||
| return | ||
| } | ||
|
|
||
| change := TransformChange{ | ||
| ChangeType: resp.ChangeType, | ||
| Transform: resp.Transform, | ||
| } | ||
|
|
||
| if resp.UpdatedFields != nil { | ||
| change.UpdatedFields = resp.UpdatedFields.Paths | ||
| } | ||
|
|
||
| select { | ||
| case changesChan <- change: | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return changesChan, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benjirewis, you don't need to review everything, but mind taking a look at how we are managing this stream to make sure there are no glaring issues? I tried a few things, and it is shutting down cleanly and working e2e.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a look here and in the server code and compared it to some server-streaming code I've seen; I see no glaring issues. Returning a channel seems pretty ergonomic to me, but I suppose you could also return a struct that implements Next() (TransformChange, error).
Also cc @viamrobotics/netcode since a whole new service type is being added here with a non-trivial amount of code. We'll have to change some small pieces of this (ShortName()) @jmatth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, let me try that out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, returning a stream struct looks a little cleaner than returning the channel directly.
…m:DTCurrie/viam-rdk into RSDK-10422-add-world-object-store-service
services/worldstatestore/client.go
Outdated
| return nil, err | ||
| } | ||
|
|
||
| changesChan := make(chan TransformChange, 1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can buy into the fact that this should be a buffered channel (not block on every send until a receive occurs). Why 1024? I assume that's just some arbitrary value? Any idea what the rate of production/consumption will look like? Point being that you may have to mess with this value to get good performance, although I have very little context on what the data even is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly arbitrary, based off some quick googling and looking around at what we use elsewhere. I believe the input componet uses a similar size so I ran with that, but very much open to input on this as it is outside my wheelhouse.
As for the data, it will be completely up to the user and how they decide to emit changes. For example, in the fake I run an animation loop at about 10fps that triggers 3-4 updates each frame.
Maybe the best choice here is to not define the size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not define the size
I think you'll certainly want some buffer, as defining no buffer will mean that any send to the channel blocks until there is a receive, which will not allow the client to store transform changes from the server while the user is not pulling from changesChan. 1024 seems like a reasonable starting value here, and if your "fake" setup seems to be streaming changes well enough, then I think it's a fine place to start. I realize we probably don't have a lot of data/intuition on exactly how this API is going to be used yet, but the channel size here is an internal detail that can certainly be changed after release.
services/worldstatestore/client.go
Outdated
| func (c *client) StreamTransformChanges(ctx context.Context, extra map[string]interface{}) (<-chan TransformChange, error) { | ||
| ctx, span := trace.StartSpan(ctx, "worldstatestore::client::StreamTransformChanges") | ||
| defer span.End() | ||
|
|
||
| ext, err := protoutils.StructToStructPb(extra) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| req := &pb.StreamTransformChangesRequest{Name: c.name, Extra: ext} | ||
| stream, err := c.client.StreamTransformChanges(ctx, req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| // Check the initial response immediately to catch early errors. | ||
| _, err = stream.Recv() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| changesChan := make(chan TransformChange, 1024) | ||
|
|
||
| go func() { | ||
| defer close(changesChan) | ||
|
|
||
| for { | ||
| resp, err := stream.Recv() | ||
| if err != nil { | ||
| if errors.Is(err, io.EOF) { | ||
| return | ||
| } | ||
| if ctx.Err() != nil || errors.Is(err, context.Canceled) { | ||
| c.logger.Debug(err) | ||
| return | ||
| } | ||
| c.logger.Errorw("failed to receive from stream", "error", err) | ||
| return | ||
| } | ||
|
|
||
| change := TransformChange{ | ||
| ChangeType: resp.ChangeType, | ||
| Transform: resp.Transform, | ||
| } | ||
|
|
||
| if resp.UpdatedFields != nil { | ||
| change.UpdatedFields = resp.UpdatedFields.Paths | ||
| } | ||
|
|
||
| select { | ||
| case changesChan <- change: | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return changesChan, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a look here and in the server code and compared it to some server-streaming code I've seen; I see no glaring issues. Returning a channel seems pretty ergonomic to me, but I suppose you could also return a struct that implements Next() (TransformChange, error).
Also cc @viamrobotics/netcode since a whole new service type is being added here with a non-trivial amount of code. We'll have to change some small pieces of this (ShortName()) @jmatth.
Adds the implementation for the
WorldStateStoreService.API PR: viamrobotics/api#695
Scope doc: https://docs.google.com/document/d/1ionvyBa7x3HZU_rwDPBvrAUrQjBrP6sJ10Z_xbBTue8/edit?tab=t.0#heading=h.tcicyojyqi6c